AWS Step Functionsのステートマシン間の依存関係をネストされたステートマシンで管理してみた

AWS Step Functionsのステートマシン間の依存関係をネストされたステートマシンで管理してみた

Clock Icon2022.11.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS Step Functionsでジョブ管理している際に、ステートマシン間を依存関係を管理したいことがあります。

例えば、前処理とメイン処理がそれぞれ異なるステートマシンで管理されているような場合です。

本記事では、ステートマシンをネストさせて、依存関係を解決する方法を紹介します。

ステートマシンをネスト化して依存関係を解決

今回紹介するのは、Step Functionsのネイティブ機能だけを利用し、ステートマシンの呼び出しをネスト化させて依存関係を解決します。

具体的には、ステートマシンをネストし、親から子のステートマシンを最適化された統合で同期呼び出しし、処理の完了を待ちます。 1:多、多:1の依存関係があるときは、複数のステートマシンを Parallel タスクで管理します。

以下では、よくあるパターンを列挙します。

1:1 の依存関係の場合

まずは最もシンプルなケースです。

後続ステートマシンXが先行するステートマシンAに依存している場合を考えます。

この場合、親ステートマシンを作成し、この親ステートマシンから、ステートマシンAとXをシーケンシャルに同期呼び出し(=完了を待つ)します。

{
  "Comment": "A description of my state machine",
  "StartAt": "StateMachine A",
  "States": {
    "StateMachine A": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:A",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        }
      },
      "Next": "StateMachine X"
    },
    "StateMachine X": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:X",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        }
      },
      "End": true
    }
  }
}

Workflow Studio での定義方法

Workflow Studio でステートマシンを定義する場合、以下の通り設定します。

Action

  • AWS Step Functions : StartExecution

Configuration

  • Integration type : Optimized
  • Additional configuration : Wait for child execution to complete
  • Response format : JSON(あるいは STRING)

このように設定すると、ステートマシンを呼び出すタスクの Resourcearn:aws:states:::states:startExecution.sync:2 となります。

"Wait for child execution to complete" は .sync に対応し、JSON は :2 に対応します。

.sync オプション(Jobパターン)

.sync 呼び出し(Jobパターン)はスタンダードワークフローの最適化された統合の場合でのみ利用できます。さらに、全ての連携サービスで利用できるわけではありません。 AWS Step Functions や Athena や AWS Batch など、ジョブ実行型のサービスにのみ対応しています。

詳細は次のドキュメントを参照ください。

Call other AWS services - AWS Step Functions

同期呼び出し時の権限

同期呼び出しを選択すると、ジョブのポーリングや完了検知のために、追加の権限が必要です。

ウィザードで IAM ポリシーを生成すると、ステートマシンを呼び出す states:StartExecution アクション以外にも、

  • states:DescribeExecution
  • states:StopExecution

さらには

  • events:PutTargets
  • events:PutRule
  • events:DescribeRule

が許可されています。

Amazon EventBridge を確認すると StepFunctionsGetEventsForStepFunctionsExecutionRule というルールが追加され、Step Functions のステート変更イベントをトラッキングしていることがわかります。 このルールの概要は "This rule is used to notify Step Functions regarding integrated workflow executions"となっています。

{
  "source": [
    "aws.states"
  ],
  "detail-type": [
    "Step Functions Execution Status Change"
  ],
  "detail": {
    "status": [
      "FAILED",
      "SUCCEEDED",
      "TIMED_OUT",
      "ABORTED"
    ]
  }
}

レスポンス形式

レスポンス形式は基本的に JSON を選びましょう。

JSONの場合(startExecution.sync:2)、Output がパース済みです。

{
   <other fields> 
   "Output": {
      "MyKey": "MyValue"
   }
}

STRINGの場合(:2 が接尾しない startExecution.sync)、Output のパースが別途必要です。

{
   <other fields>
   "Output": "{ \"MyKey\": \"MyValue\" }" 
}

複数のステートマシンに依存する場合

次に、後続ステートマシンXが複数のステートマシン(AとB)に依存している場合を考えます。

この場合、依存する複数のステートマシンを Parallel 内で定義します。

上記以外の点、例えば、同期呼び出しである点やネストする点は同じです。

このように定義すると、Parallel 内のすべてのステートマシンが完了したあとで、後続するステートマシンXが処理されます。

{
  "Comment": "A description of my state machine",
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "StateMachine A",
          "States": {
            "StateMachine A": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:A",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "StateMachine B",
          "States": {
            "StateMachine B": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:B",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "StateMachine X"
    },
    "StateMachine X": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:X",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        }
      },
      "End": true
    }
  }
}

先程とは逆に、先行するステートマシンの処理完了をトリガーに複数のステートマシンを呼び出したい場合、同じ発想で後続するステートマシンをParalle内で定義します。


{
  "Comment": "A description of my state machine",
  "StartAt": "StateMachine A",
  "States": {
    "StateMachine A": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:A",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        }
      },
      "Next": "Parallel"
    },
    "Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "StateMachine X",
          "States": {
            "StateMachine X": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:X",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "StateMachine Y",
          "States": {
            "StateMachine Y": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:Y",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        }
      ],
      "End": true
    }
  }
}

先行:後続=多:多の場合

より複雑に、依存する複数のステートマシンが完了後、後続する複数のステートマシンを呼び出す場合を考えます。

この場合、先行処理と後続処理の両方を Paralel で定義します。

{
  "Comment": "A description of my state machine",
  "StartAt": "Pre-Parallel",
  "States": {
    "Pre-Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "StateMachine A",
          "States": {
            "StateMachine A": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:A",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "StateMachine B",
          "States": {
            "StateMachine B": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:B",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Post-Parallel"
    },
    "Post-Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "StateMachine X",
          "States": {
            "StateMachine X": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:X",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "StateMachine Y",
          "States": {
            "StateMachine Y": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Parameters": {
                "StateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:Y",
                "Input": {
                  "StatePayload": "Hello from Step Functions!",
                  "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
                }
              },
              "End": true
            }
          }
        }
      ],
      "End": true
    }
  }
}

入れ子以外に依存関係の解決方法

ステートマシン間の依存を解決する方法は入れ子方式以外にも存在します。

各ステートマシンの開始時刻をずらす

ナイーブな方法としては、先行ステートマシンを午前0時から、後続ステートマシンを午前4時からというように開始時刻をずらして依存関係を解決します。

設定がシンプルな一方で、先行するステートマシンで失敗や遅延が発生した場合に備え、後続するステートマシン内で、依存するステートマシンが正常終了しているかチェックする必要があります。

CloudWatchアラームで管理

少し凝ったやり方としては、CloudWatch Alarmを設定する方法があります。

[AWS Step Functions] 複数の先行ステートマシンの実行が完了した後に後続ステートマシンを実行してみた | DevelopersIO

先行ステートマシンの実行成功メトリクス(ExecutionsSucceeded)を利用し、過去60分以内に2/2のステートマシンが実行完了したといったことを CloudWatch Alarmで管理します。

条件を満たしたタイミング(状態遷移)で、後続のステートマシンを呼び出します。

最後に

AWS Step Functionsのステートマシン間の依存関係を Step Functionsの機能だけで管理する方法を紹介しました。

ステートマシンのネスト、Jobパターン(同期呼び出し)、複数のステートマシンの管理には Parallel タスク を組み合わせると、基本的な依存関係はカバーできるはずです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.